#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <list.h>
#include "lsv_volume_proto.h"
#include "lsv_volume.h"
#include "lsv_bitmap.h"
#include "lsv_bitmap_internal.h"
#include "lsv_help.h"

int lsv_bitmap_cache_init(struct lsv_bitmap_context * node)
{
        memset(&node->cache_context, 0, sizeof(node->cache_context));

        node->cache_context.bitmap_cache = xmalloc(MAX_COUNT_CACHE * sizeof(void *));
        assert(node->cache_context.bitmap_cache);

        memset(node->cache_context.bitmap_cache, 0, MAX_COUNT_CACHE * sizeof(void *));

        node->cache_context.cur_pos = 0;
        node->cache_context.free_pos = 0;   //first.

        lsv_lock_init(&node->cache_context.lock);

        LSV_DBUG("lsv bitmap initialized, cache count=%d\r\n", MAX_COUNT_CACHE);

        return 0;
}

int lsv_bitmap_cache_free(struct lsv_bitmap_context * node)
{
        for(int i=0;i<MAX_COUNT_CACHE;i++)
        {
                if(node->cache_context.bitmap_cache[i])
                        xfree(node->cache_context.bitmap_cache[i]);
        }

        xfree(node->cache_context.bitmap_cache);

        lsv_lock_destroy(&node->cache_context.lock);

        return 0;
}

int lsv_is_bitmap_cache_dirty(lsv_bitmap_cache_unit_t * cache)
{
        uint8_t zero[BITMAP_CHUNK_SIZE / PAGE_SIZE / 8] = {0};

        if(memcmp(zero, cache->dirty_bits, sizeof(zero)) == 0)
                return 0;
        else
                return 1;
}

static inline int lsv_is_bitmap_cache_page_dirty(lsv_bitmap_cache_unit_t * cache, uint32_t page_idx)
{
        int l_index = page_idx / 8;
        int r_index = page_idx % 8;

        return (cache->dirty_bits[l_index] & (1 << r_index));
}

void lsv_bitmap_cache_mark_dirty(lsv_bitmap_cache_unit_t * cache, uint32_t page_idx, uint32_t n_pages)
{
        for(int i=0;i<n_pages;i++)
        {
                int l_index = page_idx / 8;
                int r_index = page_idx % 8;

                cache->dirty_bits[l_index] |= 1 << r_index;

                page_idx ++;
        }
}

void lsv_bitmap_cache_clear_dirty(lsv_bitmap_cache_unit_t * cache, uint32_t page_idx, uint32_t n_pages)
{
        for(int i=0;i<n_pages;i++)
        {
                int l_index = page_idx / 8;
                int r_index = page_idx % 8;

                cache->dirty_bits[l_index] &= ~(1 << r_index);

                page_idx ++;
        }
}

static inline int lsv_is_bitmap_cache_page_valid(lsv_bitmap_cache_unit_t * cache, uint32_t page_idx, uint32_t n_pages)
{
        for(int i=0;i<n_pages;i++)
        {
                int l_index = page_idx / 8;
                int r_index = page_idx % 8;

                if(!(cache->valid_bits[l_index] & (1 << r_index)))
                        return 0;

                page_idx ++;
        }

        return 1;
}

void lsv_bitmap_cache_mark_valid(lsv_bitmap_cache_unit_t * cache, uint32_t page_idx, uint32_t n_pages)
{
        for(int i=0;i<n_pages;i++)
        {
                int l_index = page_idx / 8;
                int r_index = page_idx % 8;

                cache->valid_bits[l_index] |= 1 << r_index;

                page_idx ++;
        }
}

void lsv_bitmap_cache_clear_valid(lsv_bitmap_cache_unit_t * cache, uint32_t page_idx, uint32_t n_pages)
{
        for(int i=0;i<n_pages;i++)
        {
                int l_index = page_idx / 8;
                int r_index = page_idx % 8;

                cache->valid_bits[l_index] &= ~(1 << r_index);

                page_idx ++;
        }
}

static inline void lsv_bitmap_cache_page_lock_init(lsv_bitmap_cache_unit_t * cache)
{
        for(int i=0;i<PAGE_PER_CHUNK;i++)
        {
                lsv_rwlock_init(&cache->dirty_lock[i]);
        }
}

static inline void lsv_bitmap_cache_page_wlock(lsv_bitmap_cache_unit_t * cache, uint32_t page_idx, uint32_t n_pages)
{
        #if LSV_BITMAP_USE_PAGE_LOCK

        for(int i=0;i<n_pages;i++)
        {
                lsv_wrlock(&cache->dirty_lock[page_idx]);

                page_idx ++;
        }

        #endif
}

static inline int lsv_is_bitmap_cache_loading(lsv_bitmap_cache_unit_t * cache)
{
        uint8_t zero[BITMAP_CHUNK_SIZE / PAGE_SIZE / 8] = {0};

        if(memcmp(zero, cache->load_bits, sizeof(zero)) == 0)
                return 0;
        else
                return 1;
}

void lsv_bitmap_cache_mark_load(lsv_bitmap_cache_unit_t * cache, uint32_t page_idx, uint32_t n_pages)
{
        for(int i=0;i<n_pages;i++)
        {
                int l_index = page_idx / 8;
                int r_index = page_idx % 8;

                cache->load_bits[l_index] |= 1 << r_index;

                page_idx ++;
        }
}

void lsv_bitmap_cache_clear_load(lsv_bitmap_cache_unit_t * cache, uint32_t page_idx, uint32_t n_pages)
{
        for(int i=0;i<n_pages;i++)
        {
                int l_index = page_idx / 8;
                int r_index = page_idx % 8;

                cache->load_bits[l_index] &= ~(1 << r_index);

                page_idx ++;
        }
}

static inline int lsv_bitmap_cache_page_try_wlock(lsv_bitmap_cache_unit_t * cache, uint32_t page_idx)  //must be 1 page.
{
        #if LSV_BITMAP_USE_PAGE_LOCK
        return lsv_trywrlock(&cache->dirty_lock[page_idx]);
        #else
        return 0;
        #endif
}

static inline void lsv_bitmap_cache_page_rlock(lsv_bitmap_cache_unit_t * cache, uint32_t page_idx, uint32_t n_pages)
{
        #if LSV_BITMAP_USE_PAGE_LOCK

        for(int i=0;i<n_pages;i++)
        {
                lsv_rdlock(&cache->dirty_lock[page_idx]);

                page_idx ++;
        }

        #endif
}

void lsv_bitmap_cache_page_unlock(lsv_bitmap_cache_unit_t * cache, uint32_t page_idx, uint32_t n_pages)
{
        #if LSV_BITMAP_USE_PAGE_LOCK

        for(int i=0;i<n_pages;i++)
        {
                lsv_rwunlock(&cache->dirty_lock[page_idx]);

                page_idx ++;
        }

        #endif
}

void lsv_bitmap_cache_reference(lsv_bitmap_cache_unit_t * cache)
{
        cache->ref_count ++;
}

void lsv_bitmap_cache_dereference(lsv_bitmap_cache_unit_t * cache)
{
        cache->ref_count --;
}

int lsv_bitmap_commit_dirty_node(struct lsv_bitmap_context * node)
{
        lsv_volume_proto_t *lsv_info = node->volume_context;
        struct timeval t1, t2;

        count_list_t io_list_head;
        count_list_init(&io_list_head);

        _gettimeofday(&t1, NULL);

        for(int i=0;i<MAX_COUNT_CACHE;i++)
        {
                lsv_bitmap_cache_unit_t * cache = node->cache_context.bitmap_cache[i];
                if(!cache)
                        continue;

                int page_index = -1;
                int npages = 0;

                for(int page_idx=0;page_idx<BITMAP_CHUNK_SIZE / PAGE_SIZE;page_idx ++)
                {
                        if(!lsv_is_bitmap_cache_page_dirty(cache, page_idx))
                                continue;

                        if(!npages)
                        {
                                page_index = page_idx;
                                npages = 1;
                        }
                        else if(page_index + npages == page_idx)
                        {
                                npages ++;
                        }
                        else
                        {
                                struct aio_list_entry * entry = (struct aio_list_entry *)xmalloc(sizeof(struct aio_list_entry));
                                lsv_lock_init(&entry->lock);
                                lsv_lock(&entry->lock);
                                entry->cache_unit = cache;
                                _gettimeofday(&entry->tim, NULL);

                                count_list_add_tail(&entry->hook, &io_list_head);

                                uint32_t page_off = page_index * PAGE_SIZE;
                                uint32_t page_len = npages * PAGE_SIZE;

                                lsv_volume_io_init(&entry->vio, cache->chunk_id, page_off, page_len, LSV_BITMAP_STORAGE_TYPE);

                                #if LSV_BITMAP_USE_PAGE_LOCK
                                        lsv_bitmap_cache_page_wlock(entry->cache_unit, entry->vio.offset / PAGE_SIZE, entry->vio.size / PAGE_SIZE);
                                #endif

                                int ret = lsv_bitmap_write_chunk_async(node->volume_context,
                                                             cache->bitmap_cache + page_off, &entry->vio, entry);
                                if(ret)
                                {
                                        LSV_DBUG("lsv bitmap cache commit error %d\r\n",ret);

                                        return ret;
                                }

                                lsv_info->bitmap_write_count++;

                                page_index = page_idx;
                                npages = 1;
                        }
                }

                if(npages)
                {
                        struct aio_list_entry * entry = (struct aio_list_entry *)xmalloc(sizeof(struct aio_list_entry));
                        lsv_lock_init(&entry->lock);
                        lsv_lock(&entry->lock);
                        entry->cache_unit = cache;
                        _gettimeofday(&entry->tim, NULL);

                        count_list_add_tail(&entry->hook, &io_list_head);

                        uint32_t page_off = page_index * PAGE_SIZE;
                        uint32_t page_len = npages * PAGE_SIZE;

                        lsv_volume_io_init(&entry->vio, cache->chunk_id, page_off, page_len, LSV_BITMAP_STORAGE_TYPE);

                        #if LSV_BITMAP_USE_PAGE_LOCK
                                lsv_bitmap_cache_page_wlock(entry->cache_unit, entry->vio.offset / PAGE_SIZE, entry->vio.size / PAGE_SIZE);
                        #endif

                        int ret = lsv_bitmap_write_chunk_async(node->volume_context,
                                                         cache->bitmap_cache + page_off, &entry->vio, entry);
                        if(ret)
                        {
                                LSV_DBUG("lsv bitmap cache commit error %d\r\n",ret);

                                return ret;
                        }

                        lsv_info->bitmap_write_count++;
                }

                #ifndef LSV_BITMAP_USE_PAGE_LOCK
                memset(cache->dirty_bits, 0, sizeof(cache->dirty_bits)); //异步有问题啊
                #endif
        }

        uint32_t count = io_list_head.count;

        struct list_head *pos;
        struct list_head *n;
        aio_list_entry_t *entry;
        list_for_each_safe(pos, n, &io_list_head.list) {
                entry = (aio_list_entry_t *)pos;
                lsv_lock(&entry->lock);

                list_del_init(pos);

                xfree(entry);
        }

        _gettimeofday(&t2, NULL);
        int64_t used = _time_used(&t1, &t2);
        DERROR("bitmap commit used time %lld count %u\r\n", (LLU)used, count);

        return 0;
}

//dirty mechanism is no use on row2, shouldn't be excuted on that volume.

int lsv_bitmap_commit_dirty(void *volume_context)
{
        struct lsv_bitmap_context * node = lsv_bitmap_volume_to_node(volume_context);

        lsv_bitmap_rlock(volume_context);

        int ret = lsv_bitmap_commit_dirty_node(node);
        if(!ret && node->is_session > 0)
                node->is_session  --;

        lsv_bitmap_unlock(volume_context);

        return ret;
}

void lsv_bitmap_start_session_node(struct lsv_bitmap_context * node)
{
        node->is_session ++;
}

int lsv_bitmap_start_session(void *volume_context)
{
        struct lsv_bitmap_context * node = lsv_bitmap_volume_to_node(volume_context);
        lsv_bitmap_lock_node(node);

        lsv_bitmap_start_session_node(node);

        lsv_bitmap_unlock_node(node);

        return 0;
}


uint8_t * lsv_bitmap_cache_get_internal(struct lsv_bitmap_context * node, uint32_t vvol_id, uint32_t chunk_id, int load_all, lsv_bitmap_cache_unit_t **ref)
{
        uint32_t index = node->cache_context.cur_pos;
        int unused = -1;
        int ret = 0;
        int lowest = -1;
        uint32_t ratio = -1;
        uint32_t max_ratio = 0;
        int wlock = 0;
        //level1, last accessed unit.
BEGIN:
        if(index < MAX_COUNT_CACHE && node->cache_context.bitmap_cache[index] &&
           node->cache_context.bitmap_cache[index]->chunk_id == chunk_id &&
           (!vvol_id || node->cache_context.bitmap_cache[index]->vvol_id == vvol_id))
        {
                if(lsv_tryrdlock(&node->cache_context.bitmap_cache[index]->slot_lock))
                {
                        //schedule_sleep("begin bitmap cache", 1);
                        //DFATAL("cache impossible on recent.\r\n");

                        //You think its very strange? Yeah, I'm, but seems no better way in the current concurrent environment...
                        lsv_rdlock(&node->cache_context.bitmap_cache[index]->slot_lock);
                        lsv_unlock(&node->cache_context.bitmap_cache[index]->slot_lock);

                        goto BEGIN;
                }

                node->cache_context.bitmap_cache[index]->ratio ++;

                if(ref)
                        *ref = node->cache_context.bitmap_cache[index];

                LSV_DBUG("lsv bitmap cache reuse at last pos: vvol_id=%d, chunk_id=%d\r\n",vvol_id, chunk_id);

                return node->cache_context.bitmap_cache[index]->bitmap_cache;
        }

        //if(node->cache_context.bitmap_cache[index] && node->cache_context.bitmap_cache[index]->ratio > 0)
        //        node->cache_context.bitmap_cache[index]->ratio --;

RETRY:
        //level2, find one allocated.
        for(int i=0;i<MAX_COUNT_CACHE;i++)
        {
                if(node->cache_context.bitmap_cache[i] && node->cache_context.bitmap_cache[i]->chunk_id == chunk_id &&
                   ( !vvol_id || node->cache_context.bitmap_cache[i]->vvol_id == vvol_id) )
                {
                        if(lsv_tryrdlock(&node->cache_context.bitmap_cache[i]->slot_lock))
                        {
                                //schedule_sleep("begin bitmap cache", 1);
                                //DFATAL("cache impossible on some..\r\n");
                                // TODO core timeout
                                lsv_rdlock(&node->cache_context.bitmap_cache[i]->slot_lock);
                                lsv_unlock(&node->cache_context.bitmap_cache[i]->slot_lock);

                                goto RETRY;
                        }

                        node->cache_context.cur_pos = i;

                        node->cache_context.bitmap_cache[i]->ratio ++;

                        LSV_DBUG("lsv bitmap cache reuse at %d vvol_id=%d, chunk_id=%d\r\n",i, vvol_id, chunk_id);

                        if(ref)
                                *ref = node->cache_context.bitmap_cache[i];

                        return node->cache_context.bitmap_cache[i]->bitmap_cache;
                }
                else if(unused == -1 && !node->cache_context.bitmap_cache[i])           //not allocated.
                        unused = i;

                if(node->cache_context.bitmap_cache[i] && ratio > node->cache_context.bitmap_cache[i]->ratio &&
                   !lsv_is_bitmap_cache_dirty(node->cache_context.bitmap_cache[i])&&
                   !lsv_is_bitmap_cache_loading(node->cache_context.bitmap_cache[i]))
                {
                        ratio = node->cache_context.bitmap_cache[i]->ratio;
                        lowest = i;
                }

                if(node->cache_context.bitmap_cache[i] && max_ratio < node->cache_context.bitmap_cache[i]->ratio)
                {
                        max_ratio = node->cache_context.bitmap_cache[i]->ratio;
                }
        }

        //level3, find in unallocated.
        if(unused >= 0)
        {
                LSV_DBUG("lsv bitmap cache allocated at %d, vvol_id=%d, chunk_id=%d\r\n",unused, vvol_id, chunk_id);

                node->cache_context.bitmap_cache[unused] = xmalloc(sizeof(lsv_bitmap_cache_unit_t));

                node->cache_context.bitmap_cache[unused]->ratio = -1;

                node->cache_context.bitmap_cache[unused]->ref_count = 0;

                node->cache_context.bitmap_cache[unused]->vvol_id = vvol_id?vvol_id:node->bitmap_header->vvol_id;

                lsv_bitmap_cache_page_lock_init(node->cache_context.bitmap_cache[unused]);

                lsv_rwlock_init(&node->cache_context.bitmap_cache[unused]->slot_lock);

                while(lsv_tryrdlock(&node->cache_context.bitmap_cache[unused]->slot_lock))
                {
                        schedule_sleep("begin bitmap cache", 1);
                        DFATAL("cache impossible on allocate.\r\n");
                }
        }
        else
        {
                if(lowest == -1)
                {
                        //all dirty.

                        LSV_DBUG("lsv bitmap cache full, all dirty, retrying...\r\n");

                        /*ret = lsv_bitmap_commit_dirty_node(node);
                        if(!ret && node->is_session > 0)
                                node->is_session  --;

                        if(ret)
                                LSV_DBUG("lsv bitmap cache flush dirty page error %d, retrying...\r\n", ret);
*/
                        //schedule_sleep("row cache", 1); //for row2

                        lsv_wrlock(&node->cache_context.bitmap_cache[0]->slot_lock);
                        lsv_unlock(&node->cache_context.bitmap_cache[0]->slot_lock);

                        goto RETRY;
                }

                // TODO when MAX_COUNT_CACHE is small, lock will timeout
                if(lsv_wrlock(&node->cache_context.bitmap_cache[lowest]->slot_lock))
                {
                        node->cache_context.bitmap_cache[lowest]->ratio ++;

                        schedule_sleep("begin bitmap cache", 1);

                        DFATAL("cache impossible on paging out.\r\n");

                        goto RETRY;
                }

                wlock = 1;

                LSV_DBUG("lsv bitmap cache changed to pos %d, vvol_id=%d, chunk_id=%d\r\n",lowest, vvol_id, chunk_id);

                unused = lowest;
        }

        //level4, free and load one.
        if(chunk_id == 0) //means allocated new chunk.
        {
                memset(node->cache_context.bitmap_cache[unused]->bitmap_cache, 0, BITMAP_CHUNK_SIZE);
                memset(node->cache_context.bitmap_cache[unused]->valid_bits, 0xff, sizeof(node->cache_context.bitmap_cache[unused]->valid_bits)); //all valid.

                lsv_bitmap_cache_page_lock_init(node->cache_context.bitmap_cache[unused]);
        }
        else if(node->cache_context.bitmap_cache[unused]->vvol_id == node->bitmap_header->vvol_id)
        {
                LSV_DBUG("load_bitmap cache local at pos %d, vvol_id=%d, chunk_id=%d, load_all=%d\r\n",unused, vvol_id, chunk_id, load_all);

                if(load_all)
                {

                        //lsv_bitmap_cache_mark_load(node->cache_context.bitmap_cache[unused], 0, BITMAP_CHUNK_SIZE / PAGE_SIZE); //yeild protection.

                        ret = lsv_bitmap_read_chunk(node->volume_context, 0, chunk_id, 0, BITMAP_CHUNK_SIZE,
                                        node->cache_context.bitmap_cache[unused]->bitmap_cache);

                        if(ret)
                        {
                                lsv_unlock(&node->cache_context.bitmap_cache[unused]->slot_lock);

                                errno = ret;
                                return NULL;
                        }
                        //lsv_bitmap_cache_clear_load(node->cache_context.bitmap_cache[unused], 0, BITMAP_CHUNK_SIZE / PAGE_SIZE);

                        memset(node->cache_context.bitmap_cache[unused]->valid_bits, 0xff, sizeof(node->cache_context.bitmap_cache[unused]->valid_bits)); //all valid.
                }
                else
                {
                        memset(node->cache_context.bitmap_cache[unused]->valid_bits, 0, sizeof(node->cache_context.bitmap_cache[unused]->valid_bits)); //load later.

                        #if LSV_BITMAP_CHECK_BITMAP_LBA
                        memset(node->cache_context.bitmap_cache[unused]->bitmap_cache, 0xff, BITMAP_CHUNK_SIZE);
                        #endif
                }
        }
        else
        {
                LSV_DBUG("load_bitmap cache remote at pos %d, vvol_id=%d, chunk_id=%d\r\n",lowest, vvol_id, chunk_id);

                if(load_all)
                {
                        uint64_t vol_id;

                        lsv_bitmap_vvol_to_vol(node->volume_context, vvol_id, &vol_id);

                        //lsv_bitmap_cache_mark_load(node->cache_context.bitmap_cache[unused], 0, BITMAP_CHUNK_SIZE / PAGE_SIZE); //yeild protection.

                        ret = volume_proto_remote_read_chunk(node->volume_context, vol_id, chunk_id, 0, CHUNK_SIZE,
                                                     node->cache_context.bitmap_cache[unused]->bitmap_cache);

                        if(ret)
                        {
                                lsv_unlock(&node->cache_context.bitmap_cache[unused]->slot_lock);
                                errno = ret;
                                return NULL;
                        }
                        //lsv_bitmap_cache_clear_load(node->cache_context.bitmap_cache[unused], 0, BITMAP_CHUNK_SIZE / PAGE_SIZE);

                        memset(node->cache_context.bitmap_cache[unused]->valid_bits, 0xff, sizeof(node->cache_context.bitmap_cache[unused]->valid_bits)); //all valid.
                }
                else
                {
                        memset(node->cache_context.bitmap_cache[unused]->valid_bits, 0, sizeof(node->cache_context.bitmap_cache[unused]->valid_bits)); //load later.
                        #if LSV_BITMAP_CHECK_BITMAP_LBA
                        memset(node->cache_context.bitmap_cache[unused]->bitmap_cache, 0xff, BITMAP_CHUNK_SIZE);
                        #endif
                }
        }

        if(ret)
        {
                DERROR("fatal erro, lsv_bitmap_read_chunk %d error %d\r\n", chunk_id, -errno);
                lsv_unlock(&node->cache_context.bitmap_cache[unused]->slot_lock);
                //fatal error.
                return NULL;
        }

        node->cache_context.bitmap_cache[unused]->chunk_id = chunk_id==0?-1:chunk_id;
        node->cache_context.bitmap_cache[unused]->vvol_id = vvol_id?vvol_id:node->bitmap_header->vvol_id;
        node->cache_context.bitmap_cache[unused]->ratio = max_ratio;
        memset(node->cache_context.bitmap_cache[unused]->dirty_bits, 0, sizeof(node->cache_context.bitmap_cache[unused]->dirty_bits));
        memset(node->cache_context.bitmap_cache[unused]->load_bits, 0, sizeof(node->cache_context.bitmap_cache[unused]->load_bits));

        if(wlock) //only and must work in coroutine
        {
                //lsv_unlock(&node->cache_context.bitmap_cache[unused]->slot_lock);
                //lsv_tryrdlock(&node->cache_context.bitmap_cache[unused]->slot_lock);
        }

        if(ref)
                *ref = node->cache_context.bitmap_cache[unused];

        return node->cache_context.bitmap_cache[unused]->bitmap_cache;
}

/**
 * important!!
 *
 * This is soul of the bitmap, the most important, there are some cache slots, each slot represent an upper layer application,
 * when there are too many reandom I/Os, will cause many solts swap in and out, use page lock to improve concurrent performance and slot lock to prevent paging out.
 *
 * @note 需要一种机制，把cache unit钉住，防止被swap out。
 * 比如， 第一个任务持有一个cache unit，在无同步机制的情况下，
 * 后续任务可能把该cache unit换出，导致不可知的结果。
 *
 * 在发生COW的时候，需要两个cache unit。
 *
 * @note ref上有slot lock
 * @note 有一个或多个页锁
 *
 * @param node
 * @param vvol_id
 * @param chunk_id
 * @param page_off
 * @param page_len
 * @param load_all
 * @param ref
 * @return
 */
uint8_t *lsv_bitmap_cache_get(struct lsv_bitmap_context * node, uint32_t vvol_id, uint32_t chunk_id, uint32_t page_off,
                               uint32_t page_len, int load_all, lsv_bitmap_cache_unit_t **ref)
{
        int ret;
        lsv_bitmap_cache_unit_t *cache_unit;
        uint8_t *cache;
        int page_idx;

        if(!chunk_id)
                load_all = 1;

#if !LSV_BITMAP_USE_PAGE_LOAD
load_all = 1;
#endif

AGAIN:
        cache = lsv_bitmap_cache_get_internal(node, vvol_id, chunk_id, load_all, &cache_unit);
        if(!cache)
                return NULL;

        if(chunk_id)
                assert(chunk_id == cache_unit->chunk_id);

        for(int i=0;i<page_len / PAGE_SIZE && !load_all;i++)
        {
                page_idx = page_off / PAGE_SIZE + i;

                if(!lsv_is_bitmap_cache_page_valid(cache_unit, page_idx, 1))
                {
                        lsv_bitmap_cache_page_wlock(cache_unit, page_idx, 1);

                        if( ( (cache_unit->vvol_id != vvol_id) && vvol_id != 0) || (cache_unit->chunk_id != chunk_id) ) //c99
                        {
                                DERROR("bitmap_cache unit chunk_id=%d has been replaced, try again...", chunk_id);

                                lsv_bitmap_cache_page_unlock(cache_unit, page_idx, 1);
                                lsv_bitmap_cache_unlock(cache_unit);

                                goto AGAIN;
                        }

                        //check again, if valid no need load.
                        if(lsv_is_bitmap_cache_page_valid(cache_unit, page_idx, 1))
                        {
                                lsv_bitmap_cache_page_unlock(cache_unit, page_idx, 1);
                                continue;
                        }

                        if(!vvol_id || cache_unit->vvol_id == node->bitmap_header->vvol_id)
                        {
                                LSV_DBUG("load_bitmap_page cache local at vvol_id=%d, chunk_id=%d\r\n", vvol_id, chunk_id);

                                /*struct aio_list_entry * entry = (struct aio_list_entry *)malloc(sizeof(struct aio_list_entry));
                                lsv_lock_init(&entry->lock);
                                lsv_lock(&entry->lock);
                                entry->cache_unit = cache_unit;
                                _gettimeofday(&entry->tim, NULL);

                                lsv_volume_io_init(&entry->vio, cache_unit->chunk_id, page_idx * PAGE_SIZE, PAGE_SIZE, LSV_BITMAP_STORAGE_TYPE);
*/
                                lsv_bitmap_cache_mark_load(cache_unit, page_idx, 1);

                                ret = lsv_bitmap_read_chunk(node->volume_context, 0, chunk_id, page_idx * PAGE_SIZE, PAGE_SIZE,
                                        cache_unit->bitmap_cache + page_idx * PAGE_SIZE);

                               // int ret = lsv_bitmap_read_chunk_async(node->volume_context,
                                 //                        cache_unit->bitmap_cache + page_idx * PAGE_SIZE, &entry->vio, entry);

                                //assert(!ret);
                                lsv_bitmap_cache_clear_load(cache_unit, page_idx, 1);

                                if(ret)
                                {
                                        lsv_bitmap_cache_page_unlock(cache_unit, page_idx, 1);
                                        lsv_bitmap_cache_unlock(cache_unit);

                                        errno = ret;
                                        return NULL;
                                }
                        }
                        else
                        {
                                LSV_DBUG("load_bitmap_page cache remote at vvol_id=%d, chunk_id=%d\r\n", vvol_id, chunk_id);

                                uint64_t vol_id;

                                lsv_bitmap_vvol_to_vol(node->volume_context, vvol_id, &vol_id);

                                ret = volume_proto_remote_read_chunk(node->volume_context, vol_id, chunk_id, page_idx * PAGE_SIZE, PAGE_SIZE,
                                                     cache_unit->bitmap_cache + page_idx * PAGE_SIZE);

                                //assert(!ret);
                                if(ret)
                                {
                                        lsv_bitmap_cache_page_unlock(cache_unit, page_idx, 1);
                                        lsv_bitmap_cache_unlock(cache_unit);

                                        errno = ret;
                                        return NULL;
                                }

                                //lsv_bitmap_cache_mark_valid(cache_unit, page_idx, 1);
                                //lsv_bitmap_cache_page_unlock(cache_unit, page_idx, 1);
                        }

                        lsv_bitmap_cache_mark_valid(cache_unit, page_idx, 1);

                        lsv_bitmap_cache_page_unlock(cache_unit, page_idx, 1);

                        if(chunk_id)
                                assert(chunk_id == cache_unit->chunk_id);
                }
        }

        if(!load_all)
        {
                lsv_bitmap_cache_mark_load(cache_unit, page_off / PAGE_SIZE, page_len / PAGE_SIZE); //yeild protection.

                if(chunk_id)
                assert(chunk_id == cache_unit->chunk_id);

                lsv_bitmap_cache_page_rlock(cache_unit, page_off / PAGE_SIZE, page_len / PAGE_SIZE);

                if(chunk_id)
                assert(chunk_id == cache_unit->chunk_id);

                lsv_bitmap_cache_clear_load(cache_unit, page_off / PAGE_SIZE, page_len / PAGE_SIZE);

        }
        else    //mostly from snapshot.
        {
                int not_full_valid = 0;

                //alreay in cache but partial data.
                for(int i=0;i<BITMAP_CHUNK_SIZE;i+= PAGE_SIZE)
                {
                        int page_idx = i / PAGE_SIZE;

                        if(!lsv_is_bitmap_cache_page_valid(cache_unit, page_idx, 1))
                        {
                                not_full_valid = 1;
                                break;
                        }
                }

                if(not_full_valid)
                {
                        if(chunk_id)
                        {
                                lsv_bitmap_cache_mark_load(cache_unit, 0, BITMAP_CHUNK_SIZE / PAGE_SIZE); //yeild protection.

                                if(cache_unit->vvol_id == node->bitmap_header->vvol_id)
                                        ret = lsv_bitmap_read_chunk(node->volume_context, 0, cache_unit->chunk_id, 0, BITMAP_CHUNK_SIZE,
                                                cache_unit->bitmap_cache);
                                else
                                {
                                        uint64_t vol_id;

                                        lsv_bitmap_vvol_to_vol(node->volume_context, vvol_id, &vol_id);

                                        ret = volume_proto_remote_read_chunk(node->volume_context, vol_id, cache_unit->chunk_id, 0, BITMAP_CHUNK_SIZE,
                                                cache_unit->bitmap_cache);
                                }
                                assert(!ret);

                                lsv_bitmap_cache_mark_valid(cache_unit, 0, BITMAP_CHUNK_SIZE / PAGE_SIZE);

                                lsv_bitmap_cache_clear_load(cache_unit, 0, BITMAP_CHUNK_SIZE / PAGE_SIZE);

                                if(chunk_id)
                                assert(chunk_id == cache_unit->chunk_id);
                        }
                        else
                        {
                                cache_unit->chunk_id = chunk_id;
                                memset(cache_unit->bitmap_cache, 0, BITMAP_CHUNK_SIZE);
                        }
                }

        }

        if(ref)
                *ref = cache_unit;

        #if LSV_BITMAP_CHECK_BITMAP_LBA

        lsv_bitmap_unit_t *pbitmap = (lsv_bitmap_unit_t *)(cache + page_off);
        for(int i=0;i<page_len/PAGE_SIZE;i++)
        {
                assert(pbitmap->lba != (uint64_t)0xffffffffffffffff);

                pbitmap++;
        }

        #else
        {
                /*row2_bitmap_unit_t *pbitmap = (row2_bitmap_unit_t *)(cache + page_off);
                for(int i=0;i<page_len/sizeof(row2_bitmap_unit_t);i++)
                {
                        assert(pbitmap->lba != (uint64_t)0xffffffffffffffff);

                        pbitmap++;
                } */
        }
        #endif


        #if LSV_BITMAP_CHECK_BITMAP_LBA
        for(int i=0;chunk_id == 0 && i<CHUNK_SIZE;i++)
        {
                assert(cache[i] == 0);
        }
        #endif

        return cache;
}


void lsv_bitmap_cache_pre_load(struct lsv_bitmap_context * node, uint32_t vvol_id, uint32_t chunk_id, uint32_t page_off, uint32_t page_len)
{
        lsv_bitmap_cache_unit_t * cache_unit;
        // uint8_t *cache;
        int page_idx;
        int ret;

        if(!chunk_id)
                return;

#if !LSV_BITMAP_USE_PAGE_LOAD
//load_all = 1;
#endif

AGAIN:
        lsv_bitmap_cache_get_internal(node, vvol_id, chunk_id, 0, &cache_unit);

        for(int i=0;i<page_len / PAGE_SIZE;i++)
        {
                page_idx = page_off / PAGE_SIZE + i;

                if(!lsv_is_bitmap_cache_page_valid(cache_unit, page_idx, 1))
                {
                        ret = lsv_bitmap_cache_page_try_wlock(cache_unit, page_idx);
                        if(ret)
                                continue;  //load in progress.

                        if( ((cache_unit->vvol_id != vvol_id) && vvol_id != 0) || (cache_unit->chunk_id != chunk_id) )  //c99
                        {
                                DERROR("bitmap_cache unit chunk_id=%d has been replaced, try again...", chunk_id);

                                lsv_bitmap_cache_page_unlock(cache_unit, page_idx, 1);
                                goto AGAIN;
                        }

                        //check again, if valid no need load.
                        if(lsv_is_bitmap_cache_page_valid(cache_unit, page_idx, 1))
                        {
                                lsv_bitmap_cache_page_unlock(cache_unit, page_idx, 1);
                                continue;
                        }

                        if(cache_unit->vvol_id == node->bitmap_header->vvol_id)
                        {
                                LSV_DBUG("load_bitmap_page cache local at vvol_id=%d, chunk_id=%d\r\n", vvol_id, chunk_id);

                                struct aio_list_entry * entry = (struct aio_list_entry *)xmalloc(sizeof(struct aio_list_entry));
                                lsv_lock_init(&entry->lock);
                                lsv_lock(&entry->lock);
                                entry->cache_unit = cache_unit;
                                _gettimeofday(&entry->tim, NULL);

                                lsv_volume_io_init(&entry->vio, cache_unit->chunk_id, page_idx * PAGE_SIZE, PAGE_SIZE, LSV_BITMAP_STORAGE_TYPE);

                                //ret = lsv_bitmap_read_chunk(node->volume_context, 0, chunk_id, page_idx * PAGE_SIZE, PAGE_SIZE,
                                //        cache_unit->bitmap_cache + page_idx * PAGE_SIZE);
                                lsv_bitmap_cache_mark_load(cache_unit, page_idx, 1);

                                int ret = lsv_bitmap_read_chunk_async(node->volume_context,
                                                         cache_unit->bitmap_cache + page_idx * PAGE_SIZE, &entry->vio, entry);

                                // TODO assert
                                assert(!ret);
                        }
                        else
                        {
                                LSV_DBUG("load_bitmap_page cache remote at vvol_id=%d, chunk_id=%d\r\n", vvol_id, chunk_id);

                                uint64_t vol_id;

                                lsv_bitmap_vvol_to_vol(node->volume_context, vvol_id, &vol_id);

                                ret = volume_proto_remote_read_chunk(node->volume_context, vol_id, chunk_id, page_idx * PAGE_SIZE, PAGE_SIZE,
                                                     cache_unit->bitmap_cache + page_idx * PAGE_SIZE);

                                assert(!ret);

                                lsv_bitmap_cache_mark_valid(cache_unit, page_idx, 1);
                                lsv_bitmap_cache_page_unlock(cache_unit, page_idx, 1);
                        }

                        //lsv_bitmap_cache_mark_valid(cache_unit, page_idx, 1);

                        //lsv_bitmap_cache_page_unlock(cache_unit, page_idx, 1);
                }
        }
}


void lsv_bitmap_cache_unlock(lsv_bitmap_cache_unit_t *unit)
{
        lsv_unlock(&unit->slot_lock);
}

//only release memory, no release cache entry, as the cache entry minght be used in future operating.
int lsv_bitmap_cache_release(struct lsv_bitmap_context * node)
{
        int ret;
        lsv_volume_proto_t *lsv_info = node->volume_context;
        volume_format_t format = lsv_info->volume_format;

        if(format == VOLUME_FORMAT_ROW2 || format == VOLUME_FORMAT_ROW3)
        {
                ret = lsv_bitmap_commit_dirty_node(node);
                if(!ret && node->is_session > 0)
                        node->is_session  --;

                if(ret)
                        return ret;
        }

        LSV_DBUG("enter.\r\n");

RETRY:
        for(int i=0;i<MAX_COUNT_CACHE;i++)
        {
                if(node->cache_context.bitmap_cache[i])
                {
                        if(lsv_is_bitmap_cache_dirty(node->cache_context.bitmap_cache[i])
                        || lsv_is_bitmap_cache_loading(node->cache_context.bitmap_cache[i]))
                        {
                                LSV_DBUG("cache %d dirty or loading, retry...\r\n", i);

                                schedule_sleep("bitmap", 100);

                                goto RETRY;
                        }

                        xfree(node->cache_context.bitmap_cache[i]);
                        node->cache_context.bitmap_cache[i] = NULL;
                }
        }

        return 0;
}
